深入理解ActiveMQ消息队列协议STMOP AMQP MQTT

前言

AWS MQ是完全托管的 ActiveMQ 服务, 最近需要使用, 于是学习其文档, 实践其特性, 由于 ActiveMQ 支持非常丰富的协议, OpenWire amqp stomp mqtt, 所以也学习了各大协议的特性及其SDK.

安装

本地开发最方便的方式当然是docker了, rmohr/activemq 文档比较好的且有aws支持的5.15.6版本的tag.

需要注意的是, 首先要根据其docker hub镜像文档上的几步操作, 将镜像中的默认配置文件复制到自定义的本机conf目录下 /usr/local/activemq/conf, 然后就快速地启动了一个默认配置的 ActiveMQ server

# active mq
docker run -itd --name activemq \
-p 61616:61616 -p 8161:8161 -p 5672:5672 -p 61613:61613 -p 1883:1883 -p 61614:61614 \
-v /usr/local/activemq/conf:/opt/activemq/conf \
-v /usr/local/activemq/data:/opt/activemq/data \
rmohr/activemq:5.15.6

特性

Advisory

ActiveMQ可以将本身的一些事件投递到系统的消息队列, 如 queue/topic的创建, 没有消费者的queue/topic等. http://activemq.apache.org/advisory-message.html

这个特性对于监控MQ非常有用, 默认配置时关闭的, 需要在配置文件activemq.xml中打开.

Wildcards

通配符

. 用于分割名字中的多个单词
* 表示任一名字, 不包括点号(.)
> 表示任一名字, 包括点号(.), 用于表示前缀, >符号后面不会再跟其他限制条件.

通配符可以用在配置文件中表名作用范围, 也可以用于订阅时的destination名字, 这个功能很不错.

Virtual Topic

所谓virtual topic 就是将一个正常的topic, 变成了多个queue. 如TopicA启用了Virtual topic, 则consumer可以去消费 Consumer.xxx.TopicA 这样模式的queue的消息. (http://activemq.apache.org/virtual-destinations.html)

xxx对应类似NSQ中的Channel概念.

需要在activemq.xml中配置virtualDestinationInterceptor的范围 prefix及其他选项.

  • name=">" 表示所有的topic都启用virtualTopic功能.

  • prefix="Consumer.*." 表示可以订阅的virtualTopic的pattern是Consumer..

<destinationInterceptors> 
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="Consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

Delay & Schedule

ActiveMQ支持延时消息及定时消息, 在message header中带上如下字段即可, 其中AMQ_SCHEDULED_PERIOD的最大值是long的最大值, 所以可以设置延时很长时间.

Property name type description
AMQ_SCHEDULED_DELAY long The time in milliseconds that a message will wait before being scheduled to be delivered by the broker
AMQ_SCHEDULED_PERIOD long The time in milliseconds to wait after the start time to wait before scheduling the message again
AMQ_SCHEDULED_REPEAT int The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_CRON String Use a Cron entry to set the schedule

Dead Letter Queue

如果broker投递给消费者消息, 没有ACK或NACK, 则会触发重新投递, 投递超过一定次数则会进入死信队列, 默认只有一个公共的死信队列ActiveMQ.DLQ, 如果需要给topic分别设置死信队列, 则要在修改activemq.xml.

<broker>
   
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <!-- Set the following policy on all queues using the '>' wildcard -->
        <policyEntry queue=">">
          <deadLetterStrategy>
            <!--
              Use the prefix 'DLQ.' for the destination name, and make
              the DLQ a queue rather than a topic
            -->
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
</broker>

默认非持久化的topic不会进入到死信队列中, 如果需要, 则修改activemq.xml, 加入

<!-- 
Tell the dead letter strategy to also place non-persisted messages
onto the dead-letter queue if they can't be delivered.
-->
<deadLetterStrategy>
<... processNonPersistent="true" />
</deadLetterStrategy>

实践

STOMP

STOMP是Simple (or Streaming) Text Orientated Messaging Protocol 的缩写, 设计思路借鉴了HTTP, 有content-type, header, body, frame based, text based等类似HTTP的相关概念, 设计文档 < https://stomp.github.io/stomp-specification-1.2.html>, 非常得简洁, 一页就讲完了.

协议细节及特点:

  1. 对于重复的header key, 只有第一个有效.
  2. 服务端可以限制消息大小, header field数量, header长度.
  3. 一个client开多个subscriber时, 必须设置subscribe id.
  4. NACK command 表示 requeue.
  5. stomp有事务的概念, 消息从producer发出到broker确认收到算一个事务, broker投递到consumer ACK算一个事务, 事务具有原子性.
  6. 支持SSL.

ActiveMQ作为STOMP server

  1. 支持 v1.1版本的STMOP协议.

  2. 默认最大消息长度 maxDataLength104857600, maxFrameSizeMAX_LONG.

  3. 通过 destination 名字前缀是/queue/ 还是 /topic/ 来区分是 queue (生产消费模型)还是 topic(发布订阅模型). 真正的名字是去掉包括两个/符号的前缀后的.

  4. 发送默认不是持久化的, 需要在SEND时手动指定persistent:true的header以开启持久化.

    订阅默认不是持久化的, 需要在SUBSCRIBE时手动指定activemq.subscriptionName:订阅者名字的header来开启持久化订阅.

    很多特性都是靠STOMP header来处理的, ActiveMQ官方文档上有两节讲STOMP的header. http://activemq.apache.org/stomp.html#Stomp-StompExtensionsforJMSMessageSemantics

SDK

https://github.com/go-stomp/stomp 是目前star数最高的

  1. 提了个PR https://github.com/go-stomp/stomp/pull/58
  2. 解决了个issue https://github.com/go-stomp/stomp/issues/47

demo 代码

package main

import (
"context"
"github.com/go-stomp/stomp"
"github.com/hanjm/log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
)

func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
publisher(ctx, "/topic/stomp")
}()

wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel1", "Consumer.channel1.stomp")
}()
//
wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel2", "Consumer.channel2.stomp")
}()

wg.Add(1)
go func() {
defer wg.Done()
Subscriber(ctx, "channel3", "/topic/stomp")
}()

defer func() {
cancel()
wg.Wait()
}()
SignalsListen()
}

func publisher(ctx context.Context, destination string) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
for i := 0; ; i++ {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
err = conn.Send(
destination, // destination
"text/plain", // content-type
[]byte("Test message #"+strconv.Itoa(i)), stomp.SendOpt.Header("persistent", "true")) // body
if err != nil {
log.Error(err)
return
}
}
}
}

func Subscriber(ctx context.Context, clientID string, destination string) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()
sub, err := conn.Subscribe(destination, stomp.AckClientIndividual, stomp.SubscribeOpt.Id(clientID), stomp.SubscribeOpt.Header("persistent", "true"))
if err != nil {
log.Fatal(err)
return
}
go func() {
select {
case <-ctx.Done():
err := sub.Unsubscribe()
if err != nil {
log.Fatal(clientID, err)
return
}
return
}
}()
for m := range sub.C {
if m.Err != nil {
log.Fatal(err)
return
}
log.Infof("%s msg body:%s", clientID, m.Body)
//log.Infof("%s msg header:%s", clientID, *m.Header)
//log.Infof("%s msg content-type:%s", clientID, m.ContentType)
//log.Infof("%s msg destination:%s", clientID, m.Destination)
m.Conn.Ack(m)
}
log.Info("close sub")
}

func SignalsListen() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT,
syscall.SIGTERM,
syscall.SIGINT,
syscall.SIGUSR1,
syscall.SIGUSR2)

switch <-sigs {
case syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT:
log.Info("service close")
}
return
}

MQTT

协议文档http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

翻译版文档https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

协议细节及特点:

  1. transport支持TCP, 也支持WebSocket, 所以定位于IOT.
  2. 不支持生产消费模型, 只支持发布订阅模型.
  3. 用QOS来表示消息队列中的投递语义, QOS=0 表示至多发送一次, QOS=1表示至少发送一次, QOS=2表示精确地只发送一次.

ActiveMQ作为MQTT server

  1. 通配符不同, MQTT的 / + # 分别对应 ActiveMQ的. * >.
  2. QOS=0对应的是非持久化的topic, QOS=1或者QOS=2对应持久化的topic.

AMQP

协议文档: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html

AMQP相比 stomp mqtt 就复杂得多, 毕竟名字就是高级消息队列(Advanced Message Queuing Protocol ).

协议细节及特点:

  1. AMQP有很多不同的概念, 如Link, Container, Node. 不看模型文档的话就直接使用SDK的话会比较费劲. ContainerID对应ActiveMQ client ID, LinkName对应ActiveMQ subscription name.

ActiveMQ作为AMQP server

  1. 使用1.0协议, 所以使用了0.9.1的2k star的sdk不能用.(https://github.com/streadway/amqp), 而且官方也认为没必要支持旧版本的协议.
  2. 默认最大消息长度 maxDataLength104857600(100MB), maxFrameSizeMAX_LONG, consumer持有的未确认最大消息数量prefetch为1000, producerCredit为10000. 可通过连接的URI设定.
  3. 支持SSL.
  4. 通过 destination 名字前缀是queue:// 还是 topic:// 来区分是 queue (生产消费模型)还是 topic(发布订阅模型). 真正的名字是去掉包括两个/符号的前缀后的.

性能

分别使用

github.com/vcabbage/amqp 76star 13issue 5contributors
github.com/go-stomp/stomp 132star 3issue 14contributors
github.com/eclipse/paho.mqtt.golang 650star 20issue 34contributors

作为SDK, 分别测试了下pub sub 1KB大小的消息普通场景.

publish性能上, amqp=stomp>mqtt, amqp和stomp差不多, 是mqtt的两倍多.
subscribe性能上, amqp比stomp快一点, mqtt则慢很多.

benchmark代码

package all_bench

import (
"bytes"
"context"
"github.com/eclipse/paho.mqtt.golang"
"github.com/go-stomp/stomp"
"github.com/hanjm/log"
"pack.ag/amqp"
"sync/atomic"
"testing"
"time"
)

var msgData = bytes.Repeat([]byte("1"), 1024)

var (
stompDestination = "bench-stomp"
amqpDestination = "bench-amqp"
mqttDestination = "bench-mqtt"
pubMsgCount = 20000
subMsgCount = 100
)

func TestMain(m *testing.M) {
m.Run()
}

// go test -bench Publish -benchmem
// go test -bench Sub -benchmem
func BenchmarkStompPublish(b *testing.B) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
defer conn.Disconnect()

b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
err = conn.Send(
stompDestination, // destination
"text/plain", // content-type
msgData) // body
if err != nil {
log.Error(err)
return
}
}
}

func BenchmarkAmqpPublish(b *testing.B) {
// Create client
client, err := amqp.Dial("amqp://127.0.0.1",
amqp.ConnSASLPlain("system", "manager"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
defer client.Close()

// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = session.Close(ctx)
if err != nil {
log.Errorf("failed to close session:%s", err)
return
}
//log.Info("session close")
}()

// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(amqpDestination),
amqp.LinkSourceDurability(amqp.DurabilityUnsettledState),
amqp.LinkSourceExpiryPolicy(amqp.ExpiryNever),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}

defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := sender.Close(ctx)
if err != nil {
log.Errorf("failed to close sender:%s", err)
return
}
//log.Infof("sender close")
}()

ctx := context.Background()
msg := amqp.NewMessage(msgData)

b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
// Send message
err = sender.Send(ctx, msg)
if err != nil {
log.Fatal("Sending message:", err)
}
if err != nil {
log.Fatal(err)
return
}
}
}

func BenchmarkMqttPublish(b *testing.B) {
opt := mqtt.NewClientOptions().SetClientID("pubClient").SetCleanSession(false)
opt.AddBroker("tcp://127.0.0.1:1883")
client := mqtt.NewClient(opt)
t := client.Connect()
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
if t.Wait() {
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
}
defer func() {
client.Disconnect(10000)
}()

b.N = pubMsgCount
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
token := client.Publish(mqttDestination, 2, true, msgData)
err := token.Error()
if err != nil {
log.Fatal(err)
return
}
}
}

func BenchmarkStompSubscriber(b *testing.B) {
conn, err := stomp.Dial("tcp", "127.0.0.1:61613")
if err != nil {
log.Fatal(err)
return
}
clientID := "1"
//defer conn.Disconnect()
sub, err := conn.Subscribe(stompDestination, stomp.AckClientIndividual, stomp.SubscribeOpt.Id(clientID))
if err != nil {
log.Fatal(err)
return
}
//defer func() {
// err := sub.Unsubscribe()
// if err != nil {
// log.Fatal(clientID, err)
// return
// }
// return
//}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0

go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()

defer func() {
//log.Info("close")
}()
for {
select {
case m := <-sub.C:
if m.Err != nil {
log.Fatal(m.Err)
return
}
m.Conn.Ack(m)
i++
if atomic.LoadInt64(&i) > int64(b.N) {
return
}
case <-ctx.Done():
return
}
}
}

func BenchmarkAmqpSubscriber(b *testing.B) {
// Create client
client, err := amqp.Dial("amqp://127.0.0.1",
amqp.ConnSASLPlain("system", "manager"),
)
if err != nil {
log.Fatal("Dialing AMQP server:", err)
}
//defer client.Close()

// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}

clientID := "1"
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := session.Close(ctx)
if err != nil {
log.Errorf("%s failed to close session:%s", clientID, err)
return
}
//log.Errorf("%s session close", clientID)
}()

// Continuously read messages
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(amqpDestination),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := receiver.Close(ctx)
if err != nil {
log.Errorf("%s failed to close receiver:%s", clientID, err)
return
}
//log.Errorf("%s receiver close", clientID)
}()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0

go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
for {
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
if err == context.Canceled {
log.Infof("Reading message from AMQP:%s", err)
break
}
log.Errorf("Reading message from AMQP:%s", err)
break
}
//log.Infof("%s msg body:%s value:%T %s", clientID, msg.GetData(), msg.Value, msg.Value)
// Accept message
msg.Accept()
atomic.AddInt64(&i, 1)
if atomic.LoadInt64(&i) > int64(b.N) {
//log.Info("return")
return
}
}
}

func BenchmarkMqttSubscriber(b *testing.B) {
opt := mqtt.NewClientOptions().SetClientID("subClient").SetCleanSession(false)
opt.AddBroker("tcp://127.0.0.1:1883")
client := mqtt.NewClient(opt)
t := client.Connect()
if t.Wait() {
err := t.Error()
if err != nil {
log.Fatal(err)
return
}
}
defer func() {
client.Disconnect(1000)
}()

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()

b.N = subMsgCount
b.ReportAllocs()
b.ResetTimer()
defer b.StopTimer()
var i int64 = 0

go func() {
for range time.Tick(time.Second) {
if atomic.LoadInt64(&i) >= int64(b.N) {
cancel()
}
}
}()
client.Subscribe(mqttDestination, 2, func(c mqtt.Client, m mqtt.Message) {
//log.Infof("%s msg body:%s", "1", m.Payload())
m.Ack()
atomic.AddInt64(&i, 1)
if atomic.LoadInt64(&i) > int64(b.N) {
//log.Info("return")
return
}
})
select {
case <-ctx.Done():
break
}
log.Info("close sub")
}

一些细节行为

官方的FAQ里面写了一些实现的细节

  1. 如果producer比较快而consumer比较慢的话, ActiveMQ的流量控制功能使得producer阻塞. http://activemq.apache.org/what-happens-with-a-fast-producer-and-slow-consumer.html
  2. 不支持消费者拿到消息之后Requeue, 即不支持像NSQ那样的消费者出现业务逻辑错误后重试.http://activemq.apache.org/how-do-i-unack-the-message-with-stomp.html. 但是可以利用延时消息实现类似的功能.

性能调优

  1. 如果使用了virtualTopic, 那么默认配置下, virtualTopic对应的Queue越多, 发送越慢, 因为默认virtualTopic转发到queue是串行的, 需要调整concurrentSend=true启用并发发送到queue.

    https://activemq.apache.org/virtual-destinations
    https://issues.jboss.org/browse/ENTMQ-1093
    https://github.com/apache/activemq/blob/9abe2c6f97c92fc99c5a2ef02846f62002a671cf/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java#L87

  2. concurrentStoreAndDispatchQueues设置为false. 默认配置下, 这个值是true, 根据文档所说在快速消费者情况下, 此值设置为true可以加快持久化消息的性能, 因为被快速消费了消息可以不用落盘, 但实测发现此值为true则10个producer并发发送和1个producer并发发送的性能是一样的没有提高. 设置为false之后提高producer并发则可获得性能倍速提高, 并且单个producer的发送性能并没有下降.

  3. 启用mKahaDB, ActiveMQ为了减少打开的文件描述符数量, 默认是用一个KahaDB实例来持久化消息, 但是在磁盘性能比较好的情况下, 一个kahaDB实例发挥不出磁盘的潜力, 启用多个kahaDB后性能可以获得倍速增长. 可以按queue名字的pattern来设置多个kahaDB实例, 也可以使用perDestination="true"设置每个queue一个kahaDB实例, 但这个参数也有坑, 如果destination名字超过了42个字符串, 则会被截断, 发送会报不可恢复的错. 可解决的办法是手动分好destination使用的kahadb, 但是这个配置后续不能动态改了, 只能新开Broker然后迁移. 否则会重启后如果分配规则改变导致分配到了不同的kahadb, 则之前的数据不会被消费.

    http://sigreen.github.io/2016/02/10/amq-tuning.html

    https://activemq.apache.org/kahadb#multim-kahadb-persistence-adapter